You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2022/04/01 11:43:00 UTC

[jira] [Closed] (FLINK-26920) Job executes failed with "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s."

     [ https://issues.apache.org/jira/browse/FLINK-26920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dian Fu closed FLINK-26920.
---------------------------
    Fix Version/s:     (was: 1.13.7)
       Resolution: Fixed

Fixed in:
- master via 39854d1b70c2234f4079f2e7c846eef81902aec4
- release-1.15 via 4a3d6e52fe820e53b7f3d9bf301b4a1c7d14ab41
- release-1.14 via 73d140e3544f65f759bf6c7ca3a7163702386bfc

> Job executes failed with "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s."
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26920
>                 URL: https://issues.apache.org/jira/browse/FLINK-26920
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.12.0, 1.13.0, 1.14.0
>            Reporter: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0, 1.14.5
>
>
> For the following code:
> {code}
> import numpy as np
> from pyflink.common import Row
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
> from pyflink.table import StreamTableEnvironment
> from sklearn import svm, datasets
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(stream_execution_environment=env)
> # Table Source
> t_env.execute_sql("""
>     CREATE TABLE my_source (
>         a FLOAT,
>         key STRING
>     ) WITH (
>         'connector' = 'datagen',
>         'rows-per-second' = '1',
>         'fields.a.min' = '4.3',
>         'fields.a.max' = '7.9',
>         'fields.key.length' = '10'
>     )
> """)
> def process_type():
>     return Types.ROW_NAMED(
>         ["a", "key"],
>         [Types.FLOAT(), Types.STRING()]
>     )
> # append only datastream
> ds = t_env.to_append_stream(
>     t_env.from_path('my_source'),
>     process_type())
> class MyKeyedProcessFunction(KeyedProcessFunction):
>     def open(self, runtime_context: RuntimeContext):
>         clf = svm.SVC()
>         X, y= datasets.load_iris(return_X_y=True)
>         clf.fit(X, y)
>         self.model = clf
>     def process_element(self, value: Row, ctx: 'KeyedProcessFunction.Context'):
>         # 根据role_id + space去redis查询回合结算日志
>         features = np.array([value['a'], 3.5, 1.4, 0.2]).reshape(1, -1)
>         predict = int(self.model.predict(features)[0])
>         yield Row(predict=predict, role_id=value['key'])
>         
> ds = ds.key_by(lambda a: a['key'], key_type=Types.STRING()) \
>     .process(
>         MyKeyedProcessFunction(), 
>         output_type=Types.ROW_NAMED(
>             ["hit", "role_id"],
>             [Types.INT(), Types.STRING()]
>     ))
> # 采用table sink
> t_env.execute_sql("""
>         CREATE TABLE my_sink (
>           hit INT,
>           role_id STRING
>         ) WITH (
>           'connector' = 'print'
>         )
>     """)
> t_env.create_temporary_view("predict", ds)
> t_env.execute_sql("""
>     INSERT INTO my_sink
>     SELECT * FROM predict
> """).wait()
> {code}
> It reported the following exception:
> {code}
> Caused by: java.lang.IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type "Python" was missing or set to 0 for the config option "taskmanager.memory.managed.consumer-weights".0.0
> 	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> 	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:233)
> 	at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:56)
> 	at org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator.open(AbstractOneInputPythonFunctionOperator.java:116)
> 	at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:121)
> 	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:712)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:688)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:655)
> 	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> 	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)