You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by xuzh <hu...@foxmail.com> on 2021/10/18 10:44:34 UTC

pyflink 1.14.0 udf 执行报错,根据官网写的代码

from pyflink.table import ScalarFunction, EnvironmentSettings, TableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table.expressions import call, row


class HashCode(ScalarFunction):
    def __init__(self):
        self.factor = 12

    def eval(self, s):
        return hash(s) * self.factor


env_settings = EnvironmentSettings.in_batch_mode()
btenv = TableEnvironment.create(env_settings)

hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
# 在 SQL API 中使用 Python 自定义函数
btenv.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT()))
tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
                          DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
                                         DataTypes.FIELD("b", DataTypes.STRING()),
                                         DataTypes.FIELD("c", DataTypes.FLOAT())]))
btenv.create_temporary_view("tb2", tb2)
tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
print(tb2.to_pandas())

# 3. 创建 sink 表
btenv.execute_sql("""
       CREATE TABLE rs (
           a int,
           b string,
           m bigint
       ) WITH (
           'connector' = 'print'
       )
   """)

tb2.execute_insert("rs").wait()
print(tb2.to_pandas())
#

回复: pyflink 1.14.0 udf 执行报错,根据官网写的代码

Posted by xuzh <hu...@foxmail.com>.
具体错误日志如下,拿掉udf函数就可以执行,不拿掉就会报错。我所有的udf函数执行都报错。是不是需要导入特定的jar包













------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <dian0511.fu@gmail.com&gt;;
发送时间:&nbsp;2021年10月19日(星期二) 上午10:51
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"xuzh"<huazhenxu@foxmail.com&gt;;

主题:&nbsp;Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码



我试了一下是可以运行的,可以发一下报错吗?

On Mon, Oct 18, 2021 at 6:44 PM xuzh <huazhenxu@foxmail.com&gt; wrote:

&gt; from pyflink.table import ScalarFunction, EnvironmentSettings, TableEnvironment, DataTypes
&gt; from pyflink.table.udf import udf
&gt; from pyflink.table.expressions import call, row
&gt;
&gt;
&gt; class HashCode(ScalarFunction):
&gt;&nbsp;&nbsp;&nbsp;&nbsp; def __init__(self):
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.factor = 12
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; def eval(self, s):
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return hash(s) * self.factor
&gt;
&gt;
&gt; env_settings = EnvironmentSettings.in_batch_mode()
&gt; btenv = TableEnvironment.create(env_settings)
&gt;
&gt; hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
&gt; # 在 SQL API 中使用 Python 自定义函数
&gt; btenv.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT()))
&gt; tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DataTypes.FIELD("b", DataTypes.STRING()),
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DataTypes.FIELD("c", DataTypes.FLOAT())]))
&gt; btenv.create_temporary_view("tb2", tb2)
&gt; tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
&gt; print(tb2.to_pandas())
&gt;
&gt; # 3. 创建 sink 表
&gt; btenv.execute_sql("""
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; CREATE TABLE rs (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; a int,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; b string,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; m bigint
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) WITH (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector' = 'print'
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt;&nbsp;&nbsp;&nbsp; """)
&gt;
&gt; tb2.execute_insert("rs").wait()
&gt; print(tb2.to_pandas())
&gt; #
&gt;
&gt;
&gt;
&gt;
&gt;

回复: pyflink 1.14.0 udf 执行报错,根据官网写的代码

Posted by xuzh <hu...@foxmail.com>.
------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <dian0511.fu@gmail.com&gt;;
发送时间:&nbsp;2021年10月19日(星期二) 上午10:51
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"xuzh"<huazhenxu@foxmail.com&gt;;

主题:&nbsp;Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码



我试了一下是可以运行的,可以发一下报错吗?

On Mon, Oct 18, 2021 at 6:44 PM xuzh <huazhenxu@foxmail.com&gt; wrote:

&gt; from pyflink.table import ScalarFunction, EnvironmentSettings, TableEnvironment, DataTypes
&gt; from pyflink.table.udf import udf
&gt; from pyflink.table.expressions import call, row
&gt;
&gt;
&gt; class HashCode(ScalarFunction):
&gt;&nbsp;&nbsp;&nbsp;&nbsp; def __init__(self):
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.factor = 12
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; def eval(self, s):
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return hash(s) * self.factor
&gt;
&gt;
&gt; env_settings = EnvironmentSettings.in_batch_mode()
&gt; btenv = TableEnvironment.create(env_settings)
&gt;
&gt; hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
&gt; # 在 SQL API 中使用 Python 自定义函数
&gt; btenv.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT()))
&gt; tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DataTypes.FIELD("b", DataTypes.STRING()),
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DataTypes.FIELD("c", DataTypes.FLOAT())]))
&gt; btenv.create_temporary_view("tb2", tb2)
&gt; tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
&gt; print(tb2.to_pandas())
&gt;
&gt; # 3. 创建 sink 表
&gt; btenv.execute_sql("""
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; CREATE TABLE rs (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; a int,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; b string,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; m bigint
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) WITH (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector' = 'print'
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt;&nbsp;&nbsp;&nbsp; """)
&gt;
&gt; tb2.execute_insert("rs").wait()
&gt; print(tb2.to_pandas())
&gt; #
&gt;
&gt;
&gt;
&gt;
&gt;

Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码

Posted by Dian Fu <di...@gmail.com>.
图挂了,邮件列表不能直接发图片。可以发一下更详细的日志信息吗?

On Tue, Oct 19, 2021 at 6:34 PM xuzh <hu...@foxmail.com> wrote:

> 错误日志
> Exception in thread Thread-14:
> Traceback (most recent call last):
> &nbsp; File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in
> _bootstrap_inner
> &nbsp; &nbsp; self.run()
> &nbsp; File
> "D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py",
> line 218, in run
> &nbsp; &nbsp; while not self._finished.wait(next_call - time.time()):
> &nbsp; File "D:\Anaconda3\envs\py37\lib\threading.py", line 552, in wait
> &nbsp; &nbsp; signaled = self._cond.wait(timeout)
> &nbsp; File "D:\Anaconda3\envs\py37\lib\threading.py", line 300, in wait
> &nbsp; &nbsp; gotit = waiter.acquire(True, timeout)
> OverflowError: timeout value is too large

回复: pyflink 1.14.0 udf 执行报错,根据官网写的代码

Posted by xuzh <hu...@foxmail.com>.
错误日志
Exception in thread Thread-14:
Traceback (most recent call last):
&nbsp; File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in _bootstrap_inner
&nbsp; &nbsp; self.run()
&nbsp; File "D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py", line 218, in run
&nbsp; &nbsp; while not self._finished.wait(next_call - time.time()):
&nbsp; File "D:\Anaconda3\envs\py37\lib\threading.py", line 552, in wait
&nbsp; &nbsp; signaled = self._cond.wait(timeout)
&nbsp; File "D:\Anaconda3\envs\py37\lib\threading.py", line 300, in wait
&nbsp; &nbsp; gotit = waiter.acquire(True, timeout)
OverflowError: timeout value is too large

Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码

Posted by Dian Fu <di...@gmail.com>.
我试了一下是可以运行的,可以发一下报错吗?

On Mon, Oct 18, 2021 at 6:44 PM xuzh <hu...@foxmail.com> wrote:

> from pyflink.table import ScalarFunction, EnvironmentSettings, TableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.table.expressions import call, row
>
>
> class HashCode(ScalarFunction):
>     def __init__(self):
>         self.factor = 12
>
>     def eval(self, s):
>         return hash(s) * self.factor
>
>
> env_settings = EnvironmentSettings.in_batch_mode()
> btenv = TableEnvironment.create(env_settings)
>
> hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
> # 在 SQL API 中使用 Python 自定义函数
> btenv.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT()))
> tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
>                           DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
>                                          DataTypes.FIELD("b", DataTypes.STRING()),
>                                          DataTypes.FIELD("c", DataTypes.FLOAT())]))
> btenv.create_temporary_view("tb2", tb2)
> tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
> print(tb2.to_pandas())
>
> # 3. 创建 sink 表
> btenv.execute_sql("""
>        CREATE TABLE rs (
>            a int,
>            b string,
>            m bigint
>        ) WITH (
>            'connector' = 'print'
>        )
>    """)
>
> tb2.execute_insert("rs").wait()
> print(tb2.to_pandas())
> #
>
>
>
>
>