You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2022/04/01 11:43:00 UTC
[jira] [Closed] (FLINK-26920) Job executes failed with "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s."
[ https://issues.apache.org/jira/browse/FLINK-26920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dian Fu closed FLINK-26920.
---------------------------
Fix Version/s: (was: 1.13.7)
Resolution: Fixed
Fixed in:
- master via 39854d1b70c2234f4079f2e7c846eef81902aec4
- release-1.15 via 4a3d6e52fe820e53b7f3d9bf301b4a1c7d14ab41
- release-1.14 via 73d140e3544f65f759bf6c7ca3a7163702386bfc
> Job executes failed with "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s."
> ---------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-26920
> URL: https://issues.apache.org/jira/browse/FLINK-26920
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.12.0, 1.13.0, 1.14.0
> Reporter: Dian Fu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.15.0, 1.14.5
>
>
> For the following code:
> {code}
> import numpy as np
> from pyflink.common import Row
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
> from pyflink.table import StreamTableEnvironment
> from sklearn import svm, datasets
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(stream_execution_environment=env)
> # Table Source
> t_env.execute_sql("""
> CREATE TABLE my_source (
> a FLOAT,
> key STRING
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.a.min' = '4.3',
> 'fields.a.max' = '7.9',
> 'fields.key.length' = '10'
> )
> """)
> def process_type():
> return Types.ROW_NAMED(
> ["a", "key"],
> [Types.FLOAT(), Types.STRING()]
> )
> # append only datastream
> ds = t_env.to_append_stream(
> t_env.from_path('my_source'),
> process_type())
> class MyKeyedProcessFunction(KeyedProcessFunction):
> def open(self, runtime_context: RuntimeContext):
> clf = svm.SVC()
> X, y= datasets.load_iris(return_X_y=True)
> clf.fit(X, y)
> self.model = clf
> def process_element(self, value: Row, ctx: 'KeyedProcessFunction.Context'):
> # 根据role_id + space去redis查询回合结算日志
> features = np.array([value['a'], 3.5, 1.4, 0.2]).reshape(1, -1)
> predict = int(self.model.predict(features)[0])
> yield Row(predict=predict, role_id=value['key'])
>
> ds = ds.key_by(lambda a: a['key'], key_type=Types.STRING()) \
> .process(
> MyKeyedProcessFunction(),
> output_type=Types.ROW_NAMED(
> ["hit", "role_id"],
> [Types.INT(), Types.STRING()]
> ))
> # 采用table sink
> t_env.execute_sql("""
> CREATE TABLE my_sink (
> hit INT,
> role_id STRING
> ) WITH (
> 'connector' = 'print'
> )
> """)
> t_env.create_temporary_view("predict", ds)
> t_env.execute_sql("""
> INSERT INTO my_sink
> SELECT * FROM predict
> """).wait()
> {code}
> It reported the following exception:
> {code}
> Caused by: java.lang.IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type "Python" was missing or set to 0 for the config option "taskmanager.memory.managed.consumer-weights".0.0
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:233)
> at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:56)
> at org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator.open(AbstractOneInputPythonFunctionOperator.java:116)
> at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:121)
> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:712)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:688)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:655)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)