You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 秦寒 <ha...@chinaums.com> on 2020/04/15 07:11:54 UTC

关于FLINK PYTHON UDF

你好

       我在使用kafka produce数据后,在python中使用UDF做一个add function,但
是最后的sink文件里面没有任何数据,

如果不用UDF的话直接获取一个数据在最后的sink文件里面是有数据的如下所示,DEBUG
很久也不清楚是什么原因是否能帮忙分下

 

Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}

 



 



 

 

测试结果

Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}



 

 

 

st_env.from_path("source")\
    .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
    .select("add(b1,c1)") \ 无任何输出
    .insert_into("result_tab")

无任何输出



 

 

st_env.from_path("source")\
    .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
    .select("c1")\   #正常输出

    .insert_into("result_tab")

正确输出




Re: 关于FLINK PYTHON UDF

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
我刚刚在本地完全模拟了你的数据和核心的代码,是可以在sink里拿到结果的。
我把我的测试代码放到附件里面了,
你可以参考一下,如果还是不行的话,可以提供下你的代码再帮你看一下

Best,
Xingbo


秦寒 <ha...@chinaums.com> 于2020年4月15日周三 下午3:16写道:

> 你好
>
>        我在使用kafka produce数据后,在python中使用UDF做一个add function,但是最后的sink
> 文件里面没有任何数据,
>
> 如果不用UDF的话直接获取一个数据在最后的sink文件里面是有数据的如下所示,DEBUG很久也不清楚是什么原因是否能帮忙分下
>
>
>
> *Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}*
>
>
>
>
>
>
>
>
>
> *测试结果*
>
> *Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}*
>
>
>
>
>
>
>
> st_env.from_path("source")\
>     .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
>     *.select("add(b1,c1)") \ **无任何输出*
>     .insert_into("result_tab")
>
> *无任何输出*
>
>
>
>
>
> st_env.from_path("source")\
>     .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
>     *.select("c1")\*   #正常输出
>
>
>     .insert_into("result_tab")
>
> *正确输出*
>
>