You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 陈康 <84...@qq.com> on 2021/03/12 08:24:10 UTC
关于pyflink LATERAL TABLE 问题请教
定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
毫无头绪、有大佬遇到过吗?谢谢!
class myKerasMLP(ScalarFunction):
def eval(self, *args):
...
# 返回预测结果
return str(trueY[0][0]) + '|' + str(trueY[0][1])
注册UDF函数
myKerasMLP = udf(myKerasMLP(),
input_types=[DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING()],
result_type=DataTypes.STRING())
class SplitStr(TableFunction):
def eval(self, str_value):
str_arr = str_value.split('|')
yield str_arr[0], str_arr[1]
yield str_arr[0], str_arr[1]
注册UDTF函数
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(),
DataTypes.STRING()])
t_env.register_function('train_and_predict', myKerasMLP)
t_env.register_function("splitStr", splitStr)
==================
t_env.sql_query("""
select A.hotime ,
A.before_ta ,
A.before_rssi ,
A.after_ta ,
A.after_rssil ,
A.nb_tath ,
A.nb_rssith ,
nbr_rssi ,
nbr_ta
from (SELECT
hotime ,
before_ta ,
before_rssi ,
after_ta ,
after_rssil ,
nb_tath ,
nb_rssith ,
train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil,
nb_tath, nb_rssith) predict
FROM
source) as A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
""").insert_into("predict_sink")
====================
报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
Traceback (most recent call last):
File
"C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py",
line 280, in <module>
t_env.execute('NT重连预测参数')
File
"D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py",
line 1057, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line
1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py",
line 147, in deco
return f(*a, **kw)
File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328,
in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute.
: java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
====================
这段SQL可以执行
t_env.sql_query("""
SELECT
hotime ,
before_ta ,
before_rssi ,
after_ta ,
after_rssil ,
nb_tath ,
nb_rssith ,
train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil,
nb_tath, nb_rssith) predict
FROM
source
""").insert_into("print_table")
------------------------------
+I(37,14,-66,92,-74,24,-65,22.621065|-64.12096)
+I(291,136,-76,136,-78,22,-65,19.479145|-65.958)
------------------------------
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于pyflink LATERAL TABLE 问题请教
Posted by 陈康 <84...@qq.com>.
简单提供了下 可复现的例子,请帮忙看看~谢谢!
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于pyflink LATERAL TABLE 问题请教
Posted by 陈康 <84...@qq.com>.
感谢回复:尝试着编辑简单可复现如下:请帮忙看看谢谢!
=======================SQL 源=================
/*
Navicat MySQL Data Transfer
Source Server : localhost
Source Server Version : 50717
Source Host : localhost:3306
Source Database : nufront-nt
Target Server Type : MYSQL
Target Server Version : 50717
File Encoding : 65001
Date: 2021-03-13 14:23:41
*/
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for test
-- ----------------------------
DROP TABLE IF EXISTS `test`;
CREATE TABLE `test` (
`hotime` varchar(5) DEFAULT NULL,
`before_ta` varchar(5) DEFAULT NULL,
`before_rssi` varchar(10) DEFAULT NULL,
`after_ta` varchar(5) DEFAULT NULL,
`after_rssil` varchar(10) DEFAULT NULL,
`nb_tath` varchar(5) DEFAULT NULL,
`nb_rssith` varchar(10) DEFAULT NULL,
`predict` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of test
-- ----------------------------
INSERT INTO `test` VALUES ('35', '8', '-62', '136', '-65', '20', '-65',
'22.30014|-63.884907');
INSERT INTO `test` VALUES ('43', '8', '-71', '248', '-73', '20', '-65',
'20.598848|-65.127464');
INSERT INTO `test` VALUES ('82', '216', '-74', '208', '-74', '20', '-65',
'14.919615|-66.15158');
================== 程序 ===================
# -*- coding: utf-8 -*
import logging
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,
environment_settings=env_settings)
# 设置该参数以使用 UDF
t_env.get_config().get_configuration().set_boolean("python.fn-execution.4memory.managed",
True)
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
"80m")
class SplitStr(TableFunction):
def eval(self, str_value):
str_arr = str_value.split('|')
yield str_arr[0], str_arr[1]
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(),
DataTypes.STRING()])
t_env.register_function("splitStr", splitStr)
class SplitStr(TableFunction):
def eval(self, str_value):
str_arr = str_value.split('|')
yield str_arr[0], str_arr[1]
class myKerasMLP(ScalarFunction):
def eval(self, *args):
# 拼接参数
a = ''
for u in args:
a = u + "|"
return a
myKerasMLP = udf(myKerasMLP(),
input_types=[DataTypes.STRING(), DataTypes.STRING()],
result_type=DataTypes.STRING())
t_env.register_function('train_and_predict', myKerasMLP)
t_env.execute_sql("""
CREATE TABLE print_table (
hotime STRING ,
before_ta STRING ,
before_rssi STRING ,
after_ta STRING ,
after_rssil STRING ,
nb_tath STRING ,
nb_rssith STRING ,
predict STRING
) WITH (
'connector' = 'print'
)
""")
t_env.execute_sql("""
CREATE TABLE source (
hotime STRING ,
before_ta STRING ,
before_rssi STRING ,
after_ta STRING ,
after_rssil STRING ,
nb_tath STRING ,
nb_rssith STRING ,
predict STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/nufront-nt',
'table-name' = 'test',
'username' = 'root',
'password' = '123456'
)
""")
t_env.execute_sql("""
CREATE TABLE predict_sink (
hotime STRING ,
before_ta STRING ,
before_rssi STRING ,
after_ta STRING ,
after_rssil STRING ,
nb_tath STRING ,
nb_rssith STRING ,
nbr_rssi STRING ,
nbr_ta STRING
) WITH (
'connector' = 'print'
)
""")
#######################################
## 可执行
#######################################
# t_env.sql_query("""
# select A.hotime ,
# A.before_ta ,
# A.before_rssi ,
# A.after_ta ,
# A.after_rssil ,
# A.nb_tath ,
# A.nb_rssith ,
# nbr_rssi ,
# nbr_ta
# from (SELECT
# hotime ,
# before_ta ,
# before_rssi ,
# after_ta ,
# after_rssil ,
# nb_tath ,
# nb_rssith ,
# predict
# FROM
# source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
# """).insert_into("predict_sink")
#######################################
## 执行报错
#######################################
# t_env.sql_query("""
# select A.hotime ,
# A.before_ta ,
# A.before_rssi ,
# A.after_ta ,
# A.after_rssil ,
# A.nb_tath ,
# A.nb_rssith ,
# nbr_rssi ,
# nbr_ta
# from (SELECT
# hotime ,
# before_ta ,
# before_rssi ,
# after_ta ,
# after_rssil ,
# nb_tath ,
# nb_rssith ,
# train_and_predict(nb_tath, nb_rssith) predict
# FROM
# source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
# """).insert_into("predict_sink")
#######################################
## 可执行
#######################################
t_env.sql_query("""
SELECT
hotime ,
before_ta ,
before_rssi ,
after_ta ,
after_rssil ,
nb_tath ,
nb_rssith ,
train_and_predict(hotime, before_ta) predict
FROM
source
""").insert_into("print_table")
t_env.execute('pyflink UDTF')
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于pyflink LATERAL TABLE 问题请教
Posted by 陈康 <84...@qq.com>.
apache-flink 1.11.1
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于pyflink LATERAL TABLE 问题请教
Posted by Dian Fu <di...@gmail.com>.
用的PyFlink版本是多少?另外,如果方便的话,可以提供一个比较容易复现的例子吗?
On Fri, Mar 12, 2021 at 4:57 PM 陈康 <84...@qq.com> wrote:
> 定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
> 毫无头绪、有大佬遇到过吗?谢谢!
>
> class myKerasMLP(ScalarFunction):
>
> def eval(self, *args):
> ...
> # 返回预测结果
> return str(trueY[0][0]) + '|' + str(trueY[0][1])
>
> 注册UDF函数
> myKerasMLP = udf(myKerasMLP(),
> input_types=[DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING()],
> result_type=DataTypes.STRING())
>
> class SplitStr(TableFunction):
> def eval(self, str_value):
> str_arr = str_value.split('|')
> yield str_arr[0], str_arr[1]
> yield str_arr[0], str_arr[1]
>
> 注册UDTF函数
> splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(),
> DataTypes.STRING()])
>
> t_env.register_function('train_and_predict', myKerasMLP)
> t_env.register_function("splitStr", splitStr)
>
> ==================
>
> t_env.sql_query("""
> select A.hotime ,
> A.before_ta ,
> A.before_rssi ,
> A.after_ta ,
> A.after_rssil ,
> A.nb_tath ,
> A.nb_rssith ,
> nbr_rssi ,
> nbr_ta
> from (SELECT
> hotime ,
> before_ta ,
> before_rssi ,
> after_ta ,
> after_rssil ,
> nb_tath ,
> nb_rssith ,
> train_and_predict(hotime, before_ta, before_rssi, after_ta,
> after_rssil,
> nb_tath, nb_rssith) predict
> FROM
> source) as A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
> """).insert_into("predict_sink")
>
> ====================
> 报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
> Traceback (most recent call last):
> File
>
> "C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py",
> line 280, in <module>
> t_env.execute('NT重连预测参数')
> File
>
> "D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py",
> line 1057, in execute
> return JobExecutionResult(self._j_tenv.execute(job_name))
> File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line
> 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
> File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py",
> line 147, in deco
> return f(*a, **kw)
> File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328,
> in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute.
> : java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
> at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>
> ====================
> 这段SQL可以执行
> t_env.sql_query("""
> SELECT
> hotime ,
> before_ta ,
> before_rssi ,
> after_ta ,
> after_rssil ,
> nb_tath ,
> nb_rssith ,
> train_and_predict(hotime, before_ta, before_rssi, after_ta,
> after_rssil,
> nb_tath, nb_rssith) predict
> FROM
> source
> """).insert_into("print_table")
>
> ------------------------------
> +I(37,14,-66,92,-74,24,-65,22.621065|-64.12096)
> +I(291,136,-76,136,-78,22,-65,19.479145|-65.958)
> ------------------------------
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>