You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2022/03/30 06:40:00 UTC

[jira] [Created] (FLINK-26920) It reported "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s."

Dian Fu created FLINK-26920:
-------------------------------

             Summary: It reported "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s."
                 Key: FLINK-26920
                 URL: https://issues.apache.org/jira/browse/FLINK-26920
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.14.0, 1.13.0, 1.12.0, 1.15.0
            Reporter: Dian Fu


For the following code:
{code}
import numpy as np
from pyflink.common import Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.table import StreamTableEnvironment
from sklearn import svm, datasets

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)

# Table Source
t_env.execute_sql("""
    CREATE TABLE my_source (
        a FLOAT,
        key STRING
    ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1',
        'fields.a.min' = '4.3',
        'fields.a.max' = '7.9',
        'fields.key.length' = '10'
    )
""")


def process_type():
    return Types.ROW_NAMED(
        ["a", "key"],
        [Types.FLOAT(), Types.STRING()]
    )


# append only datastream
ds = t_env.to_append_stream(
    t_env.from_path('my_source'),
    process_type())


class MyKeyedProcessFunction(KeyedProcessFunction):

    def open(self, runtime_context: RuntimeContext):
        clf = svm.SVC()
        X, y= datasets.load_iris(return_X_y=True)
        clf.fit(X, y)

        self.model = clf


    def process_element(self, value: Row, ctx: 'KeyedProcessFunction.Context'):

        # 根据role_id + space去redis查询回合结算日志

        features = np.array([value['a'], 3.5, 1.4, 0.2]).reshape(1, -1)
        predict = int(self.model.predict(features)[0])

        yield Row(predict=predict, role_id=value['key'])


        
ds = ds.key_by(lambda a: a['key'], key_type=Types.STRING()) \
    .process(
        MyKeyedProcessFunction(), 
        output_type=Types.ROW_NAMED(
            ["hit", "role_id"],
            [Types.INT(), Types.STRING()]
    ))


# 采用table sink
t_env.execute_sql("""
        CREATE TABLE my_sink (
          hit INT,
          role_id STRING
        ) WITH (
          'connector' = 'print'
        )
    """)

t_env.create_temporary_view("predict", ds)
t_env.execute_sql("""
    INSERT INTO my_sink
    SELECT * FROM predict
""").wait()
{code}

It reported the following exception:
{code}
Caused by: java.lang.IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type "Python" was missing or set to 0 for the config option "taskmanager.memory.managed.consumer-weights".0.0
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:233)
	at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:56)
	at org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator.open(AbstractOneInputPythonFunctionOperator.java:116)
	at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:121)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:712)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:688)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:655)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
	at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)