You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小学生 <20...@qq.com> on 2020/06/04 06:46:23 UTC
pyflink 嵌套使用函数出错
各位大佬好,初学pyflink,有一个问题需要帮忙解决下。
代码为:
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,TableConfig,BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
from pyflink.datastream import StreamExecutionEnvironment
elements = 'aaa|bbb'
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
@udf(input_types=[DataTypes.STRING()],
result_type=DataTypes.ARRAY(DataTypes.STRING()))
def split(x):
return x.strip().split("|")
# t_env.register_function("split", udf(lambda i: i.strip().split("|"), [DataTypes.STRING()], DataTypes.ARRAY(DataTypes.STRING())))
t_env.register_function("split", split)
#split拆分后为一个2元数组
@udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()],
result_type=DataTypes.STRING())
def get(array, index):
return array[index]
@udf(input_types=[DataTypes.ARRAY(DataTypes.STRING())],
result_type=DataTypes.STRING())
def convert(array):
return get(array, 0)
t_env.register_function("convert", convert)
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field('b', DataTypes.STRING())) \
.with_schema(Schema()
.field('b', DataTypes.STRING())) \
.create_temporary_table('mySink')
t_env.from_elements(elements)\
.alias('line')\
.select('split(line)')\
.alias('array')\
.select('convert(array) as b')\
.insert_into('mySink')\
.t_env.execute("convert_job")
报错为:
TypeError: 'UserDefinedFunctionWrapper' object is not callable
Re: pyflink 嵌套使用函数出错
Posted by 小学生 <20...@qq.com>.
您好,谢谢大佬的指导,排查了下确实是get部分的udf去掉即可,感恩
Re: pyflink 嵌套使用函数出错
Posted by Xingbo Huang <hx...@gmail.com>.
Hi, 小学生
我稍微修改了一下你的code(你的from_elements那样写按理说就没法运行)
code是能够正确运行的,你可以参考一下,你去掉的是不是有问题,或者你把你修改后的代码贴上来,再一起看看
from pyflink.table import StreamTableEnvironment, DataTypes,
EnvironmentSettings, TableConfig, \
BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
from pyflink.datastream import StreamExecutionEnvironment
def test_test():
elements = 'aaa|bbb'
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
@udf(input_types=[DataTypes.STRING()],
result_type=DataTypes.ARRAY(DataTypes.STRING()))
def split(x):
print(x.strip().split("|"))
return x.strip().split("|")
# t_env.register_function("split", udf(lambda i: i.strip().split("|"),
[DataTypes.STRING()], DataTypes.ARRAY(DataTypes.STRING())))
t_env.register_function("split", split)
# split拆分后为一个2元数组
def get(arr, index):
return arr[index]
@udf(input_types=[DataTypes.ARRAY(DataTypes.STRING())],
result_type=DataTypes.STRING())
def convert(arr):
return get(arr, 0)
t_env.register_function("convert", convert)
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field('b', DataTypes.STRING())) \
.with_schema(Schema()
.field('b', DataTypes.STRING())) \
.create_temporary_table('mySink')
t_env.from_elements([(elements,), ], ["a"]) \
.alias('line') \
.select('split(line) as arr') \
.select('convert(arr) as b') \
.insert_into('mySink')
t_env.execute("convert_job")
if __name__ == '__main__':
test_test()
Best,
Xingbo
小学生 <20...@qq.com> 于2020年6月4日周四 下午3:38写道:
> 大佬好,去掉了运行还是出错,一样的错误
Re: pyflink 嵌套使用函数出错
Posted by 小学生 <20...@qq.com>.
大佬好,去掉了运行还是出错,一样的错误
Re: pyflink 嵌套使用函数出错
Posted by Xingbo Huang <hx...@gmail.com>.
Hi, 小学生。
把函数get的标签udf给去掉,它只是普通的Python函数,不要加上@udf,加上之后就不是python的函数了。只有Python的UDF你才要加上@udf
Best,
Xingbo
小学生 <20...@qq.com> 于2020年6月4日周四 下午2:46写道:
> 各位大佬好,初学pyflink,有一个问题需要帮忙解决下。
>
>
> 代码为:
> from pyflink.table import StreamTableEnvironment, DataTypes,
> EnvironmentSettings,TableConfig,BatchTableEnvironment
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
> from pyflink.table.udf import udf
> from pyflink.datastream import StreamExecutionEnvironment
> elements = 'aaa|bbb'
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env,
> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
>
>
> @udf(input_types=[DataTypes.STRING()],
> result_type=DataTypes.ARRAY(DataTypes.STRING()))
> def split(x):
> return x.strip().split("|")
> # t_env.register_function("split", udf(lambda i: i.strip().split("|"),
> [DataTypes.STRING()], DataTypes.ARRAY(DataTypes.STRING())))
> t_env.register_function("split", split)
> #split拆分后为一个2元数组
> @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()],
> result_type=DataTypes.STRING())
> def get(array, index):
> return array[index]
>
>
> @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING())],
> result_type=DataTypes.STRING())
> def convert(array):
> return get(array, 0)
>
>
> t_env.register_function("convert", convert)
>
>
> t_env.connect(FileSystem().path('/tmp/output')) \
> .with_format(OldCsv()
> .field('b',
> DataTypes.STRING())) \
> .with_schema(Schema()
> .field('b',
> DataTypes.STRING())) \
> .create_temporary_table('mySink')
>
>
> t_env.from_elements(elements)\
> .alias('line')\
> .select('split(line)')\
> .alias('array')\
> .select('convert(array) as b')\
> .insert_into('mySink')\
> .t_env.execute("convert_job")
>
> 报错为:
> TypeError: 'UserDefinedFunctionWrapper' object is not callable