You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by xuzh <hu...@foxmail.com> on 2021/10/18 10:44:34 UTC
pyflink 1.14.0 udf 执行报错,根据官网写的代码
from pyflink.table import ScalarFunction, EnvironmentSettings, TableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table.expressions import call, row
class HashCode(ScalarFunction):
def __init__(self):
self.factor = 12
def eval(self, s):
return hash(s) * self.factor
env_settings = EnvironmentSettings.in_batch_mode()
btenv = TableEnvironment.create(env_settings)
hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
# 在 SQL API 中使用 Python 自定义函数
btenv.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT()))
tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
DataTypes.FIELD("b", DataTypes.STRING()),
DataTypes.FIELD("c", DataTypes.FLOAT())]))
btenv.create_temporary_view("tb2", tb2)
tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
print(tb2.to_pandas())
# 3. 创建 sink 表
btenv.execute_sql("""
CREATE TABLE rs (
a int,
b string,
m bigint
) WITH (
'connector' = 'print'
)
""")
tb2.execute_insert("rs").wait()
print(tb2.to_pandas())
#
回复: pyflink 1.14.0 udf 执行报错,根据官网写的代码
Posted by xuzh <hu...@foxmail.com>.
具体错误日志如下,拿掉udf函数就可以执行,不拿掉就会报错。我所有的udf函数执行都报错。是不是需要导入特定的jar包
------------------ 原始邮件 ------------------
发件人: "user-zh" <dian0511.fu@gmail.com>;
发送时间: 2021年10月19日(星期二) 上午10:51
收件人: "user-zh"<user-zh@flink.apache.org>;"xuzh"<huazhenxu@foxmail.com>;
主题: Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码
我试了一下是可以运行的,可以发一下报错吗?
On Mon, Oct 18, 2021 at 6:44 PM xuzh <huazhenxu@foxmail.com> wrote:
> from pyflink.table import ScalarFunction, EnvironmentSettings, TableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.table.expressions import call, row
>
>
> class HashCode(ScalarFunction):
> def __init__(self):
> self.factor = 12
>
> def eval(self, s):
> return hash(s) * self.factor
>
>
> env_settings = EnvironmentSettings.in_batch_mode()
> btenv = TableEnvironment.create(env_settings)
>
> hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
> # 在 SQL API 中使用 Python 自定义函数
> btenv.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT()))
> tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
> DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
> DataTypes.FIELD("b", DataTypes.STRING()),
> DataTypes.FIELD("c", DataTypes.FLOAT())]))
> btenv.create_temporary_view("tb2", tb2)
> tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
> print(tb2.to_pandas())
>
> # 3. 创建 sink 表
> btenv.execute_sql("""
> CREATE TABLE rs (
> a int,
> b string,
> m bigint
> ) WITH (
> 'connector' = 'print'
> )
> """)
>
> tb2.execute_insert("rs").wait()
> print(tb2.to_pandas())
> #
>
>
>
>
>
回复: pyflink 1.14.0 udf 执行报错,根据官网写的代码
Posted by xuzh <hu...@foxmail.com>.
------------------ 原始邮件 ------------------
发件人: "user-zh" <dian0511.fu@gmail.com>;
发送时间: 2021年10月19日(星期二) 上午10:51
收件人: "user-zh"<user-zh@flink.apache.org>;"xuzh"<huazhenxu@foxmail.com>;
主题: Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码
我试了一下是可以运行的,可以发一下报错吗?
On Mon, Oct 18, 2021 at 6:44 PM xuzh <huazhenxu@foxmail.com> wrote:
> from pyflink.table import ScalarFunction, EnvironmentSettings, TableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.table.expressions import call, row
>
>
> class HashCode(ScalarFunction):
> def __init__(self):
> self.factor = 12
>
> def eval(self, s):
> return hash(s) * self.factor
>
>
> env_settings = EnvironmentSettings.in_batch_mode()
> btenv = TableEnvironment.create(env_settings)
>
> hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
> # 在 SQL API 中使用 Python 自定义函数
> btenv.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT()))
> tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
> DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
> DataTypes.FIELD("b", DataTypes.STRING()),
> DataTypes.FIELD("c", DataTypes.FLOAT())]))
> btenv.create_temporary_view("tb2", tb2)
> tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
> print(tb2.to_pandas())
>
> # 3. 创建 sink 表
> btenv.execute_sql("""
> CREATE TABLE rs (
> a int,
> b string,
> m bigint
> ) WITH (
> 'connector' = 'print'
> )
> """)
>
> tb2.execute_insert("rs").wait()
> print(tb2.to_pandas())
> #
>
>
>
>
>
Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码
Posted by Dian Fu <di...@gmail.com>.
图挂了,邮件列表不能直接发图片。可以发一下更详细的日志信息吗?
On Tue, Oct 19, 2021 at 6:34 PM xuzh <hu...@foxmail.com> wrote:
> 错误日志
> Exception in thread Thread-14:
> Traceback (most recent call last):
> File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in
> _bootstrap_inner
> self.run()
> File
> "D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py",
> line 218, in run
> while not self._finished.wait(next_call - time.time()):
> File "D:\Anaconda3\envs\py37\lib\threading.py", line 552, in wait
> signaled = self._cond.wait(timeout)
> File "D:\Anaconda3\envs\py37\lib\threading.py", line 300, in wait
> gotit = waiter.acquire(True, timeout)
> OverflowError: timeout value is too large
回复: pyflink 1.14.0 udf 执行报错,根据官网写的代码
Posted by xuzh <hu...@foxmail.com>.
错误日志
Exception in thread Thread-14:
Traceback (most recent call last):
File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in _bootstrap_inner
self.run()
File "D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py", line 218, in run
while not self._finished.wait(next_call - time.time()):
File "D:\Anaconda3\envs\py37\lib\threading.py", line 552, in wait
signaled = self._cond.wait(timeout)
File "D:\Anaconda3\envs\py37\lib\threading.py", line 300, in wait
gotit = waiter.acquire(True, timeout)
OverflowError: timeout value is too large
Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码
Posted by Dian Fu <di...@gmail.com>.
我试了一下是可以运行的,可以发一下报错吗?
On Mon, Oct 18, 2021 at 6:44 PM xuzh <hu...@foxmail.com> wrote:
> from pyflink.table import ScalarFunction, EnvironmentSettings, TableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.table.expressions import call, row
>
>
> class HashCode(ScalarFunction):
> def __init__(self):
> self.factor = 12
>
> def eval(self, s):
> return hash(s) * self.factor
>
>
> env_settings = EnvironmentSettings.in_batch_mode()
> btenv = TableEnvironment.create(env_settings)
>
> hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
> # 在 SQL API 中使用 Python 自定义函数
> btenv.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT()))
> tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
> DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
> DataTypes.FIELD("b", DataTypes.STRING()),
> DataTypes.FIELD("c", DataTypes.FLOAT())]))
> btenv.create_temporary_view("tb2", tb2)
> tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
> print(tb2.to_pandas())
>
> # 3. 创建 sink 表
> btenv.execute_sql("""
> CREATE TABLE rs (
> a int,
> b string,
> m bigint
> ) WITH (
> 'connector' = 'print'
> )
> """)
>
> tb2.execute_insert("rs").wait()
> print(tb2.to_pandas())
> #
>
>
>
>
>