You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 陈康 <84...@qq.com> on 2021/03/18 03:32:09 UTC
pyflink UDTF求助!
定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
毫无头绪、有大佬遇到过吗?谢谢!
好像是 udf 和 udtf 一起使用时出现的~下面有可复现的例子,谢谢
class myKerasMLP(ScalarFunction):
def eval(self, *args):
...
# 返回预测结果
return str(trueY[0][0]) + '|' + str(trueY[0][1])
注册UDF函数
myKerasMLP = udf(myKerasMLP(),
input_types=[DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING()],
result_type=DataTypes.STRING())
class SplitStr(TableFunction):
def eval(self, str_value):
str_arr = str_value.split('|')
yield str_arr[0], str_arr[1]
yield str_arr[0], str_arr[1]
注册UDTF函数
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(),
DataTypes.STRING()])
t_env.register_function('train_and_predict', myKerasMLP)
t_env.register_function("splitStr", splitStr)
==================
t_env.sql_query("""
select A.hotime ,
A.before_ta ,
A.before_rssi ,
A.after_ta ,
A.after_rssil ,
A.nb_tath ,
A.nb_rssith ,
nbr_rssi ,
nbr_ta
from (SELECT
hotime ,
before_ta ,
before_rssi ,
after_ta ,
after_rssil ,
nb_tath ,
nb_rssith ,
train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil,
nb_tath, nb_rssith) predict
FROM
source) as A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
""").insert_into("predict_sink")
====================
报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
Traceback (most recent call last):
File
"C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py",
line 280, in <module>
t_env.execute('NT重连预测参数')
File
"D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py",
line 1057, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line
1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py",
line 147, in deco
return f(*a, **kw)
File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328,
in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute.
: java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
====================
这段SQL可以执行
t_env.sql_query("""
SELECT
hotime ,
before_ta ,
before_rssi ,
after_ta ,
after_rssil ,
nb_tath ,
nb_rssith ,
train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil,
nb_tath, nb_rssith) predict
FROM
source
""").insert_into("print_table")
------------------------------
+I(37,14,-66,92,-74,24,-65,22.621065|-64.12096)
+I(291,136,-76,136,-78,22,-65,19.479145|-65.958)
------------------------------
====================== 简单可复现的例子 ========================
=======================SQL 源=================
/*
Navicat MySQL Data Transfer
Source Server : localhost
Source Server Version : 50717
Source Host : localhost:3306
Source Database : nufront-nt
Target Server Type : MYSQL
Target Server Version : 50717
File Encoding : 65001
Date: 2021-03-13 14:23:41
*/
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for test
-- ----------------------------
DROP TABLE IF EXISTS `test`;
CREATE TABLE `test` (
`hotime` varchar(5) DEFAULT NULL,
`before_ta` varchar(5) DEFAULT NULL,
`before_rssi` varchar(10) DEFAULT NULL,
`after_ta` varchar(5) DEFAULT NULL,
`after_rssil` varchar(10) DEFAULT NULL,
`nb_tath` varchar(5) DEFAULT NULL,
`nb_rssith` varchar(10) DEFAULT NULL,
`predict` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of test
-- ----------------------------
INSERT INTO `test` VALUES ('35', '8', '-62', '136', '-65', '20', '-65',
'22.30014|-63.884907');
INSERT INTO `test` VALUES ('43', '8', '-71', '248', '-73', '20', '-65',
'20.598848|-65.127464');
INSERT INTO `test` VALUES ('82', '216', '-74', '208', '-74', '20', '-65',
'14.919615|-66.15158');
================== 程序 ===================
# -*- coding: utf-8 -*
import logging
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,
environment_settings=env_settings)
# 设置该参数以使用 UDF
t_env.get_config().get_configuration().set_boolean("python.fn-execution.4memory.managed",
True)
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
"80m")
class SplitStr(TableFunction):
def eval(self, str_value):
str_arr = str_value.split('|')
yield str_arr[0], str_arr[1]
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(),
DataTypes.STRING()])
t_env.register_function("splitStr", splitStr)
class SplitStr(TableFunction):
def eval(self, str_value):
str_arr = str_value.split('|')
yield str_arr[0], str_arr[1]
class myKerasMLP(ScalarFunction):
def eval(self, *args):
# 拼接参数
a = ''
for u in args:
a += u + "|"
return a
myKerasMLP = udf(myKerasMLP(),
input_types=[DataTypes.STRING(), DataTypes.STRING()],
result_type=DataTypes.STRING())
t_env.register_function('train_and_predict', myKerasMLP)
t_env.execute_sql("""
CREATE TABLE print_table (
hotime STRING ,
before_ta STRING ,
before_rssi STRING ,
after_ta STRING ,
after_rssil STRING ,
nb_tath STRING ,
nb_rssith STRING ,
predict STRING
) WITH (
'connector' = 'print'
)
""")
t_env.execute_sql("""
CREATE TABLE source (
hotime STRING ,
before_ta STRING ,
before_rssi STRING ,
after_ta STRING ,
after_rssil STRING ,
nb_tath STRING ,
nb_rssith STRING ,
predict STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/nufront-nt',
'table-name' = 'test',
'username' = 'root',
'password' = '123456'
)
""")
t_env.execute_sql("""
CREATE TABLE predict_sink (
hotime STRING ,
before_ta STRING ,
before_rssi STRING ,
after_ta STRING ,
after_rssil STRING ,
nb_tath STRING ,
nb_rssith STRING ,
nbr_rssi STRING ,
nbr_ta STRING
) WITH (
'connector' = 'print'
)
""")
#######################################
## 可执行
#######################################
# t_env.sql_query("""
# select A.hotime ,
# A.before_ta ,
# A.before_rssi ,
# A.after_ta ,
# A.after_rssil ,
# A.nb_tath ,
# A.nb_rssith ,
# nbr_rssi ,
# nbr_ta
# from (SELECT
# hotime ,
# before_ta ,
# before_rssi ,
# after_ta ,
# after_rssil ,
# nb_tath ,
# nb_rssith ,
# predict
# FROM
# source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
# """).insert_into("predict_sink")
#######################################
## 执行报错
#######################################
# t_env.sql_query("""
# select A.hotime ,
# A.before_ta ,
# A.before_rssi ,
# A.after_ta ,
# A.after_rssil ,
# A.nb_tath ,
# A.nb_rssith ,
# nbr_rssi ,
# nbr_ta
# from (SELECT
# hotime ,
# before_ta ,
# before_rssi ,
# after_ta ,
# after_rssil ,
# nb_tath ,
# nb_rssith ,
# train_and_predict(nb_tath, nb_rssith) predict
# FROM
# source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
# """).insert_into("predict_sink")
#######################################
## 可执行
#######################################
t_env.sql_query("""
SELECT
hotime ,
before_ta ,
before_rssi ,
after_ta ,
after_rssil ,
nb_tath ,
nb_rssith ,
train_and_predict(hotime, before_ta) predict
FROM
source
""").insert_into("print_table")
t_env.execute('pyflink UDTF')
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: pyflink UDTF求助!
Posted by 陈康 <84...@qq.com>.
感谢回复!
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: pyflink UDTF求助!
Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
经过排查,这个确实一个bug。问题出在没有正确处理在sub-query中使用的python udf。我已经创建JIRA[1]
来记录这个问题了。目前的workaroud方案是使用Table API。
具体可以参考下面的代码:
>>>
a = t_env.sql_query("""
SELECT
hotime ,
before_ta ,
before_rssi ,
after_ta ,
after_rssil ,
nb_tath ,
nb_rssith ,
train_and_predict(nb_tath, nb_rssith) predict
FROM source
""")
result = a.join_lateral("splitStr(predict) as (nbr_rssi, nbr_ta)")
[1] https://issues.apache.org/jira/browse/FLINK-21856
Best,
Xingbo
陈康 <84...@qq.com> 于2021年3月18日周四 下午1:30写道:
> apache-flink 1.11.1
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Re: pyflink UDTF求助!
Posted by 陈康 <84...@qq.com>.
apache-flink 1.11.1
--
Sent from: http://apache-flink.147419.n8.nabble.com/