You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2022/03/30 06:40:00 UTC
[jira] [Created] (FLINK-26920) It reported "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s."
Dian Fu created FLINK-26920:
-------------------------------
Summary: It reported "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s."
Key: FLINK-26920
URL: https://issues.apache.org/jira/browse/FLINK-26920
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.14.0, 1.13.0, 1.12.0, 1.15.0
Reporter: Dian Fu
For the following code:
{code}
import numpy as np
from pyflink.common import Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.table import StreamTableEnvironment
from sklearn import svm, datasets
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
# Table Source
t_env.execute_sql("""
CREATE TABLE my_source (
a FLOAT,
key STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.a.min' = '4.3',
'fields.a.max' = '7.9',
'fields.key.length' = '10'
)
""")
def process_type():
return Types.ROW_NAMED(
["a", "key"],
[Types.FLOAT(), Types.STRING()]
)
# append only datastream
ds = t_env.to_append_stream(
t_env.from_path('my_source'),
process_type())
class MyKeyedProcessFunction(KeyedProcessFunction):
def open(self, runtime_context: RuntimeContext):
clf = svm.SVC()
X, y= datasets.load_iris(return_X_y=True)
clf.fit(X, y)
self.model = clf
def process_element(self, value: Row, ctx: 'KeyedProcessFunction.Context'):
# 根据role_id + space去redis查询回合结算日志
features = np.array([value['a'], 3.5, 1.4, 0.2]).reshape(1, -1)
predict = int(self.model.predict(features)[0])
yield Row(predict=predict, role_id=value['key'])
ds = ds.key_by(lambda a: a['key'], key_type=Types.STRING()) \
.process(
MyKeyedProcessFunction(),
output_type=Types.ROW_NAMED(
["hit", "role_id"],
[Types.INT(), Types.STRING()]
))
# 采用table sink
t_env.execute_sql("""
CREATE TABLE my_sink (
hit INT,
role_id STRING
) WITH (
'connector' = 'print'
)
""")
t_env.create_temporary_view("predict", ds)
t_env.execute_sql("""
INSERT INTO my_sink
SELECT * FROM predict
""").wait()
{code}
It reported the following exception:
{code}
Caused by: java.lang.IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type "Python" was missing or set to 0 for the config option "taskmanager.memory.managed.consumer-weights".0.0
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:233)
at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:56)
at org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator.open(AbstractOneInputPythonFunctionOperator.java:116)
at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:121)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:712)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:688)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:655)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)