You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/25 08:31:03 UTC

[GitHub] [flink] dianfu commented on a change in pull request #18418: [FLINK-25719][python] Support General Python UDF in Thread Mode

dianfu commented on a change in pull request #18418:
URL: https://github.com/apache/flink/pull/18418#discussion_r790681717



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
##########
@@ -158,6 +158,20 @@ public static boolean isPythonWorkerUsingManagedMemory(Configuration config) {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    public static boolean isPythonWorkerInProcessMode(Configuration config) {
+        Class clazz = loadClass("org.apache.flink.python.PythonOptions");
+        try {
+            return config.getString(
+                            (ConfigOption<String>)
+                                    (clazz.getField("PYTHON_EXECUTION_MODE").get(null)))
+                    .equals("process");

Review comment:
       ```suggestion
                       .equalsIgnoreCase("process");
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/python/env/pemja/EmbeddedPythonEnvironmentManager.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env.pemja;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.python.env.AbstractPythonEnvironmentManager;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.PythonEnvironment;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import pemja.core.PythonInterpreterConfig;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The base class of python environment manager which is used to create the PythonEnvironment
+ * object. It's used to run python UDF in embedded Python environment.
+ */
+@Internal
+public class EmbeddedPythonEnvironmentManager extends AbstractPythonEnvironmentManager {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(EmbeddedPythonEnvironmentManager.class);
+
+    public EmbeddedPythonEnvironmentManager(
+            PythonDependencyInfo dependencyInfo,
+            String[] tmpDirectories,
+            Map<String, String> systemEnv,
+            JobID jobID) {
+        super(dependencyInfo, tmpDirectories, systemEnv, jobID);
+    }
+
+    @Override
+    public PythonEnvironment createEnvironment() throws Exception {
+        Map<String, String> env = new HashMap<>(resource.env);
+
+        PythonInterpreterConfig.ExecType execType;
+        if (dependencyInfo.getThreadModeExecType().equals("sub-interpreter")) {

Review comment:
       ```suggestion
           if (dependencyInfo.getThreadModeExecType().equalsIgnoreCase("sub-interpreter")) {
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.PythonConfig;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.pemja.EmbeddedPythonEnvironment;
+import org.apache.flink.python.env.pemja.EmbeddedPythonEnvironmentManager;
+import org.apache.flink.table.functions.python.PythonEnv;
+
+import pemja.core.PythonInterpreter;
+import pemja.core.PythonInterpreterConfig;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.python.env.AbstractPythonEnvironmentManager.PYTHON_WORKING_DIR;
+
+/**
+ * Abstract class for all stream operators to execute Python functions in embedded Python
+ * environment.
+ */
+@Internal
+public abstract class AbstractEmbeddedPythonFunctionOperator<OUT>
+        extends AbstractPythonFunctionOperator<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static ReentrantLock lock = new ReentrantLock();
+
+    private static Map<JobID, Tuple2<String, Integer>> workingDirectories = new HashMap<>();
+
+    /** The python config. */
+    protected transient PythonConfig pythonConfig;
+
+    /** Every operator will hold the only python interpreter. */
+    protected transient PythonInterpreter interpreter;
+
+    private transient EmbeddedPythonEnvironmentManager pythonEnvironmentManager;
+
+    public AbstractEmbeddedPythonFunctionOperator(Configuration config) {
+        super(config);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        pythonConfig = new PythonConfig(config);
+        pythonEnvironmentManager = createPythonEnvironmentManager();
+        pythonEnvironmentManager.open();
+        EmbeddedPythonEnvironment environment =
+                (EmbeddedPythonEnvironment) pythonEnvironmentManager.createEnvironment();
+
+        PythonInterpreterConfig interpreterConfig = environment.getConfig();
+        interpreter = new PythonInterpreter(interpreterConfig);
+        Map<String, String> env = environment.getEnv();
+
+        if (env.containsKey(PYTHON_WORKING_DIR)) {
+            lock.lockInterruptibly();
+
+            JobID jobId = getRuntimeContext().getJobId();
+            Tuple2<String, Integer> dirAndNums;
+
+            if (workingDirectories.containsKey(jobId)) {
+                dirAndNums = workingDirectories.get(jobId);
+            } else {
+                dirAndNums = Tuple2.of(null, 0);
+                workingDirectories.put(jobId, dirAndNums);
+            }
+            dirAndNums.f1 += 1;
+
+            if (dirAndNums.f0 == null) {
+                // get current directory.
+                interpreter.exec("import os;cwd = os.getcwd();");
+                dirAndNums.f0 = interpreter.get("cwd", String.class);
+                String workingDirectory = env.get(PYTHON_WORKING_DIR);
+                // set working directory
+                interpreter.exec(String.format("import os;os.chdir('%s')", workingDirectory));
+            }
+
+            lock.unlock();
+        }
+
+        if (interpreterConfig
+                .getExecType()
+                .equals(PythonInterpreterConfig.ExecType.SUB_INTERPRETER)) {
+            LOG.info("Create Operation in multi interpreters.");
+            createOperationInMultiInterpreters(pythonConfig.getPythonExec(), env);
+        } else {
+            LOG.info("Create Operation in multi threads.");
+            createOperationInMultiThread();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        try {
+            JobID jobId = getRuntimeContext().getJobId();
+            if (workingDirectories.containsKey(jobId)) {
+                lock.lockInterruptibly();
+                Tuple2<String, Integer> dirAndNums = workingDirectories.get(jobId);
+                dirAndNums.f1 -= 1;
+                if (dirAndNums.f1 == 0) {
+                    // change to previous working directory.
+                    interpreter.exec(String.format("import os;os.chdir('%s')", dirAndNums.f0));
+                }
+                lock.unlock();

Review comment:
       move to finally

##########
File path: flink-python/src/main/java/org/apache/flink/python/env/pemja/EmbeddedPythonEnvironmentManager.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env.pemja;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.python.env.AbstractPythonEnvironmentManager;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.PythonEnvironment;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import pemja.core.PythonInterpreterConfig;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The base class of python environment manager which is used to create the PythonEnvironment
+ * object. It's used to run python UDF in embedded Python environment.
+ */
+@Internal
+public class EmbeddedPythonEnvironmentManager extends AbstractPythonEnvironmentManager {
+
+    private static final Logger LOG =

Review comment:
       not used

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.PythonConfig;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.pemja.EmbeddedPythonEnvironment;
+import org.apache.flink.python.env.pemja.EmbeddedPythonEnvironmentManager;
+import org.apache.flink.table.functions.python.PythonEnv;
+
+import pemja.core.PythonInterpreter;
+import pemja.core.PythonInterpreterConfig;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.python.env.AbstractPythonEnvironmentManager.PYTHON_WORKING_DIR;
+
+/**
+ * Abstract class for all stream operators to execute Python functions in embedded Python
+ * environment.
+ */
+@Internal
+public abstract class AbstractEmbeddedPythonFunctionOperator<OUT>
+        extends AbstractPythonFunctionOperator<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static ReentrantLock lock = new ReentrantLock();
+
+    private static Map<JobID, Tuple2<String, Integer>> workingDirectories = new HashMap<>();
+
+    /** The python config. */
+    protected transient PythonConfig pythonConfig;
+
+    /** Every operator will hold the only python interpreter. */
+    protected transient PythonInterpreter interpreter;
+
+    private transient EmbeddedPythonEnvironmentManager pythonEnvironmentManager;
+
+    public AbstractEmbeddedPythonFunctionOperator(Configuration config) {
+        super(config);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        pythonConfig = new PythonConfig(config);
+        pythonEnvironmentManager = createPythonEnvironmentManager();
+        pythonEnvironmentManager.open();
+        EmbeddedPythonEnvironment environment =
+                (EmbeddedPythonEnvironment) pythonEnvironmentManager.createEnvironment();
+
+        PythonInterpreterConfig interpreterConfig = environment.getConfig();
+        interpreter = new PythonInterpreter(interpreterConfig);
+        Map<String, String> env = environment.getEnv();
+
+        if (env.containsKey(PYTHON_WORKING_DIR)) {
+            lock.lockInterruptibly();
+
+            JobID jobId = getRuntimeContext().getJobId();
+            Tuple2<String, Integer> dirAndNums;
+
+            if (workingDirectories.containsKey(jobId)) {
+                dirAndNums = workingDirectories.get(jobId);
+            } else {
+                dirAndNums = Tuple2.of(null, 0);
+                workingDirectories.put(jobId, dirAndNums);
+            }
+            dirAndNums.f1 += 1;
+
+            if (dirAndNums.f0 == null) {
+                // get current directory.
+                interpreter.exec("import os;cwd = os.getcwd();");
+                dirAndNums.f0 = interpreter.get("cwd", String.class);
+                String workingDirectory = env.get(PYTHON_WORKING_DIR);
+                // set working directory
+                interpreter.exec(String.format("import os;os.chdir('%s')", workingDirectory));
+            }
+
+            lock.unlock();

Review comment:
       move to finally clause

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.python.AbstractEmbeddedPythonFunctionOperator;
+import org.apache.flink.streaming.api.utils.ProtoUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.operators.python.utils.StreamRecordRowDataWrappingCollector;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The Python {@link ScalarFunction} operator in embedded Python environment. */
+@Internal
+public class EmbeddedPythonScalarFunctionOperator
+        extends AbstractEmbeddedPythonFunctionOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The Python {@link ScalarFunction}s to be executed. */
+    private final PythonFunctionInfo[] scalarFunctions;
+
+    /** The offsets of user-defined function inputs. */
+    private final int[] udfInputOffsets;
+
+    /** The input logical type. */
+    protected final RowType inputType;
+
+    /** The user-defined function input logical type. */
+    protected final RowType udfInputType;
+
+    /** The user-defined function output logical type. */
+    protected final RowType udfOutputType;
+
+    private GeneratedProjection forwardedFieldGeneratedProjection;
+
+    /** The GenericRowData reused holding the execution result of python udf. */
+    private GenericRowData reuseResultRowData;
+
+    /** The collector used to collect records. */
+    private transient StreamRecordRowDataWrappingCollector rowDataWrapper;
+
+    /** The Projection which projects the forwarded fields from the input row. */
+    private transient Projection<RowData, BinaryRowData> forwardedFieldProjection;
+
+    private transient PythonTypeUtils.DataConverter[] userDefinedFunctionInputConverters;
+    private transient Object[] userDefinedFunctionInputArgs;
+    private transient PythonTypeUtils.DataConverter[] userDefinedFunctionOutputConverters;
+
+    /** Whether there is only one input argument. */
+    private transient boolean isOneArg;
+
+    /** Whether is only one field of udf result. */
+    private transient boolean isOneFieldResult;
+
+    public EmbeddedPythonScalarFunctionOperator(
+            Configuration config,
+            PythonFunctionInfo[] scalarFunctions,
+            RowType inputType,
+            RowType udfInputType,
+            RowType udfOutputType,
+            int[] udfInputOffsets) {
+        super(config);
+        this.inputType = Preconditions.checkNotNull(inputType);
+        this.udfInputType = Preconditions.checkNotNull(udfInputType);
+        this.udfOutputType = Preconditions.checkNotNull(udfOutputType);
+        this.udfInputOffsets = Preconditions.checkNotNull(udfInputOffsets);
+        this.scalarFunctions = Preconditions.checkNotNull(scalarFunctions);
+    }
+
+    public EmbeddedPythonScalarFunctionOperator(
+            Configuration config,
+            PythonFunctionInfo[] scalarFunctions,
+            RowType inputType,
+            RowType udfInputType,
+            RowType udfOutputType,
+            int[] udfInputOffsets,
+            GeneratedProjection forwardedFieldGeneratedProjection) {
+        this(config, scalarFunctions, inputType, udfInputType, udfOutputType, udfInputOffsets);
+        this.forwardedFieldGeneratedProjection =
+                Preconditions.checkNotNull(forwardedFieldGeneratedProjection);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void open() throws Exception {
+        isOneArg = udfInputOffsets.length == 1;
+        isOneFieldResult = udfOutputType.getFieldCount() == 1;
+        super.open();
+        rowDataWrapper = new StreamRecordRowDataWrappingCollector(output);
+        reuseResultRowData = new GenericRowData(udfOutputType.getFieldCount());
+        RowType userDefinedFunctionInputType =
+                new RowType(
+                        Arrays.stream(udfInputOffsets)
+                                .mapToObj(i -> inputType.getFields().get(i))
+                                .collect(Collectors.toList()));
+        userDefinedFunctionInputConverters =
+                userDefinedFunctionInputType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(PythonTypeUtils::toDataConverter)
+                        .toArray(PythonTypeUtils.DataConverter[]::new);
+        userDefinedFunctionInputArgs = new Object[udfInputOffsets.length];
+        userDefinedFunctionOutputConverters =
+                udfOutputType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(PythonTypeUtils::toDataConverter)
+                        .toArray(PythonTypeUtils.DataConverter[]::new);
+
+        if (forwardedFieldGeneratedProjection != null) {
+            forwardedFieldProjection =
+                    forwardedFieldGeneratedProjection.newInstance(
+                            Thread.currentThread().getContextClassLoader());
+        }
+
+        // invoke `open` method of ScalarOperation.
+        interpreter.invokeMethod("scalar_operation", "open");
+    }
+
+    @Override
+    public void createOperationInMultiThread() {
+        interpreter.exec(
+                "from pyflink.fn_execution.utils.operation_utils import create_scalar_operation_from_proto");
+        interpreter.set("proto", getUserDefinedFunctionsProto().toByteArray());
+
+        interpreter.exec(
+                String.format(
+                        "scalar_operation = create_scalar_operation_from_proto(proto, %s, %s)",
+                        isOneArg ? "True" : "False", isOneFieldResult ? "True" : "False"));
+    }
+
+    @Override
+    public void createOperationInMultiInterpreters(String pythonExecutable, Map<String, String> env)
+            throws IOException {
+        String[] commands =
+                new String[] {
+                    pythonExecutable,
+                    "-c",
+                    String.format(
+                            "from pyflink.fn_execution.utils.operation_utils import create_serialized_scalar_operation_from_proto;"
+                                    + "print(create_serialized_scalar_operation_from_proto(%s, %s, %s))",
+                            Arrays.toString(getUserDefinedFunctionsProto().toByteArray()),
+                            isOneArg ? "True" : "False",
+                            isOneFieldResult ? "True" : "False")
+                };
+        interpreter.exec(
+                "from pyflink.fn_execution.utils.operation_utils import deserialized_operation_from_serialized_bytes");
+        interpreter.exec(
+                String.format(
+                        "scalar_operation = deserialized_operation_from_serialized_bytes(%s)",
+                        executeScript(commands, env)));
+    }
+
+    @Override
+    public void endInput() {
+        if (interpreter != null) {
+            // invoke `close` method of ScalarOperation.
+            interpreter.invokeMethod("scalar_operation", "close");
+        }
+    }
+
+    @Override
+    public PythonEnv getPythonEnv() {
+        return scalarFunctions[0].getPythonFunction().getPythonEnv();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void processElement(StreamRecord<RowData> element) {
+        RowData value = element.getValue();
+
+        Object udfArgs = null;
+        if (userDefinedFunctionInputArgs.length > 1) {
+            for (int i = 0; i < userDefinedFunctionInputArgs.length; i++) {
+                userDefinedFunctionInputArgs[i] =
+                        userDefinedFunctionInputConverters[i].toExternal(value, udfInputOffsets[i]);
+            }
+            udfArgs = userDefinedFunctionInputArgs;
+        } else if (userDefinedFunctionInputArgs.length == 1) {
+            udfArgs = userDefinedFunctionInputConverters[0].toExternal(value, udfInputOffsets[0]);
+        }
+
+        if (isOneFieldResult) {
+            Object udfResult =
+                    interpreter.invokeMethod("scalar_operation", "process_element", udfArgs);
+            reuseResultRowData.setField(
+                    0, userDefinedFunctionOutputConverters[0].toInternal(udfResult));
+        } else {
+            Object[] udfResult =
+                    (Object[])
+                            interpreter.invokeMethod(
+                                    "scalar_operation", "process_element", udfArgs);
+            for (int i = 0; i < udfResult.length; i++) {
+                reuseResultRowData.setField(
+                        i, userDefinedFunctionOutputConverters[i].toInternal(udfResult[i]));
+            }
+        }
+
+        if (forwardedFieldProjection != null) {
+            BinaryRowData forwardedRowData = forwardedFieldProjection.apply(value).copy();
+            JoinedRowData reuseJoinedRow = new JoinedRowData(forwardedRowData, reuseResultRowData);
+            rowDataWrapper.collect(reuseJoinedRow);
+        } else {
+            rowDataWrapper.collect(reuseResultRowData);
+        }
+    }
+
+    @Override
+    protected void invokeFinishBundle() throws Exception {
+        // TODO: Support batches invoking.
+        if (elementCount > 0) {

Review comment:
       could be removed for now?

##########
File path: flink-python/src/main/java/org/apache/flink/python/env/pemja/EmbeddedPythonEnvironmentManager.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env.pemja;

Review comment:
       Also move process related classes into package `org.apache.flink.python.env.process`?

##########
File path: flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
##########
@@ -214,4 +214,25 @@
                                     + "in each batch when iterating a Python MapState. Note that this is an experimental flag "
                                     + "and might not be available "
                                     + "in future releases.");
+
+    /** Specify the python runtime execution mode. */
+    @Experimental
+    public static final ConfigOption<String> PYTHON_EXECUTION_MODE =
+            ConfigOptions.key("python.execution-mode")
+                    .defaultValue("process")

Review comment:
       Does it make sense to make the available values as following to avoid introducing another configuration PYTHON_THREAD_MODE_EXECUTION_TYPE :
   - process
   - multi-thread
   - sub-interpreter

##########
File path: flink-python/src/main/java/org/apache/flink/python/env/pemja/EmbeddedPythonEnvironmentManager.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env.pemja;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.python.env.AbstractPythonEnvironmentManager;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.PythonEnvironment;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import pemja.core.PythonInterpreterConfig;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The base class of python environment manager which is used to create the PythonEnvironment
+ * object. It's used to run python UDF in embedded Python environment.
+ */
+@Internal
+public class EmbeddedPythonEnvironmentManager extends AbstractPythonEnvironmentManager {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(EmbeddedPythonEnvironmentManager.class);
+
+    public EmbeddedPythonEnvironmentManager(
+            PythonDependencyInfo dependencyInfo,
+            String[] tmpDirectories,
+            Map<String, String> systemEnv,
+            JobID jobID) {
+        super(dependencyInfo, tmpDirectories, systemEnv, jobID);
+    }
+
+    @Override
+    public PythonEnvironment createEnvironment() throws Exception {
+        Map<String, String> env = new HashMap<>(resource.env);

Review comment:
       ```suggestion
           Map<String, String> env = new HashMap<>(getPythonEnv());
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/python/env/ExternalPythonEnvironmentManager.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.python.PythonFunctionRunner;
+
+import java.util.Map;
+
+/**
+ * The base class of python environment manager which is used to create the PythonEnvironment object
+ * and the RetrievalToken. It's used when the python function runner * is configured to run python
+ * UDF in external environment.
+ */
+@Internal
+public abstract class ExternalPythonEnvironmentManager extends AbstractPythonEnvironmentManager {

Review comment:
       It seems that it's not necessary to introduce this abstract class?

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.PythonConfig;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.pemja.EmbeddedPythonEnvironment;
+import org.apache.flink.python.env.pemja.EmbeddedPythonEnvironmentManager;
+import org.apache.flink.table.functions.python.PythonEnv;
+
+import pemja.core.PythonInterpreter;
+import pemja.core.PythonInterpreterConfig;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.python.env.AbstractPythonEnvironmentManager.PYTHON_WORKING_DIR;
+
+/**
+ * Abstract class for all stream operators to execute Python functions in embedded Python
+ * environment.
+ */
+@Internal
+public abstract class AbstractEmbeddedPythonFunctionOperator<OUT>
+        extends AbstractPythonFunctionOperator<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static ReentrantLock lock = new ReentrantLock();
+
+    private static Map<JobID, Tuple2<String, Integer>> workingDirectories = new HashMap<>();
+
+    /** The python config. */
+    protected transient PythonConfig pythonConfig;
+
+    /** Every operator will hold the only python interpreter. */
+    protected transient PythonInterpreter interpreter;
+
+    private transient EmbeddedPythonEnvironmentManager pythonEnvironmentManager;
+
+    public AbstractEmbeddedPythonFunctionOperator(Configuration config) {
+        super(config);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        pythonConfig = new PythonConfig(config);
+        pythonEnvironmentManager = createPythonEnvironmentManager();
+        pythonEnvironmentManager.open();
+        EmbeddedPythonEnvironment environment =
+                (EmbeddedPythonEnvironment) pythonEnvironmentManager.createEnvironment();
+
+        PythonInterpreterConfig interpreterConfig = environment.getConfig();
+        interpreter = new PythonInterpreter(interpreterConfig);
+        Map<String, String> env = environment.getEnv();
+
+        if (env.containsKey(PYTHON_WORKING_DIR)) {
+            lock.lockInterruptibly();
+
+            JobID jobId = getRuntimeContext().getJobId();
+            Tuple2<String, Integer> dirAndNums;
+
+            if (workingDirectories.containsKey(jobId)) {
+                dirAndNums = workingDirectories.get(jobId);
+            } else {
+                dirAndNums = Tuple2.of(null, 0);
+                workingDirectories.put(jobId, dirAndNums);
+            }
+            dirAndNums.f1 += 1;
+
+            if (dirAndNums.f0 == null) {
+                // get current directory.
+                interpreter.exec("import os;cwd = os.getcwd();");
+                dirAndNums.f0 = interpreter.get("cwd", String.class);
+                String workingDirectory = env.get(PYTHON_WORKING_DIR);
+                // set working directory
+                interpreter.exec(String.format("import os;os.chdir('%s')", workingDirectory));
+            }
+
+            lock.unlock();
+        }
+
+        if (interpreterConfig
+                .getExecType()
+                .equals(PythonInterpreterConfig.ExecType.SUB_INTERPRETER)) {

Review comment:
       equalsIgnoreCase

##########
File path: flink-python/src/main/java/org/apache/flink/python/env/pemja/EmbeddedPythonEnvironmentManager.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env.pemja;

Review comment:
       ```suggestion
   package org.apache.flink.python.env.embedded;
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.python.AbstractEmbeddedPythonFunctionOperator;
+import org.apache.flink.streaming.api.utils.ProtoUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.operators.python.utils.StreamRecordRowDataWrappingCollector;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The Python {@link ScalarFunction} operator in embedded Python environment. */
+@Internal
+public class EmbeddedPythonScalarFunctionOperator
+        extends AbstractEmbeddedPythonFunctionOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The Python {@link ScalarFunction}s to be executed. */
+    private final PythonFunctionInfo[] scalarFunctions;
+
+    /** The offsets of user-defined function inputs. */
+    private final int[] udfInputOffsets;
+
+    /** The input logical type. */
+    protected final RowType inputType;
+
+    /** The user-defined function input logical type. */
+    protected final RowType udfInputType;
+
+    /** The user-defined function output logical type. */
+    protected final RowType udfOutputType;
+
+    private GeneratedProjection forwardedFieldGeneratedProjection;
+
+    /** The GenericRowData reused holding the execution result of python udf. */
+    private GenericRowData reuseResultRowData;
+
+    /** The collector used to collect records. */
+    private transient StreamRecordRowDataWrappingCollector rowDataWrapper;
+
+    /** The Projection which projects the forwarded fields from the input row. */
+    private transient Projection<RowData, BinaryRowData> forwardedFieldProjection;
+
+    private transient PythonTypeUtils.DataConverter[] userDefinedFunctionInputConverters;
+    private transient Object[] userDefinedFunctionInputArgs;
+    private transient PythonTypeUtils.DataConverter[] userDefinedFunctionOutputConverters;
+
+    /** Whether there is only one input argument. */
+    private transient boolean isOneArg;
+
+    /** Whether is only one field of udf result. */
+    private transient boolean isOneFieldResult;
+
+    public EmbeddedPythonScalarFunctionOperator(
+            Configuration config,
+            PythonFunctionInfo[] scalarFunctions,
+            RowType inputType,
+            RowType udfInputType,
+            RowType udfOutputType,
+            int[] udfInputOffsets) {
+        super(config);
+        this.inputType = Preconditions.checkNotNull(inputType);
+        this.udfInputType = Preconditions.checkNotNull(udfInputType);
+        this.udfOutputType = Preconditions.checkNotNull(udfOutputType);
+        this.udfInputOffsets = Preconditions.checkNotNull(udfInputOffsets);
+        this.scalarFunctions = Preconditions.checkNotNull(scalarFunctions);
+    }
+
+    public EmbeddedPythonScalarFunctionOperator(
+            Configuration config,
+            PythonFunctionInfo[] scalarFunctions,
+            RowType inputType,
+            RowType udfInputType,
+            RowType udfOutputType,
+            int[] udfInputOffsets,
+            GeneratedProjection forwardedFieldGeneratedProjection) {
+        this(config, scalarFunctions, inputType, udfInputType, udfOutputType, udfInputOffsets);
+        this.forwardedFieldGeneratedProjection =
+                Preconditions.checkNotNull(forwardedFieldGeneratedProjection);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void open() throws Exception {
+        isOneArg = udfInputOffsets.length == 1;
+        isOneFieldResult = udfOutputType.getFieldCount() == 1;
+        super.open();
+        rowDataWrapper = new StreamRecordRowDataWrappingCollector(output);
+        reuseResultRowData = new GenericRowData(udfOutputType.getFieldCount());
+        RowType userDefinedFunctionInputType =
+                new RowType(
+                        Arrays.stream(udfInputOffsets)
+                                .mapToObj(i -> inputType.getFields().get(i))
+                                .collect(Collectors.toList()));
+        userDefinedFunctionInputConverters =
+                userDefinedFunctionInputType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(PythonTypeUtils::toDataConverter)
+                        .toArray(PythonTypeUtils.DataConverter[]::new);
+        userDefinedFunctionInputArgs = new Object[udfInputOffsets.length];
+        userDefinedFunctionOutputConverters =
+                udfOutputType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(PythonTypeUtils::toDataConverter)
+                        .toArray(PythonTypeUtils.DataConverter[]::new);
+
+        if (forwardedFieldGeneratedProjection != null) {
+            forwardedFieldProjection =
+                    forwardedFieldGeneratedProjection.newInstance(
+                            Thread.currentThread().getContextClassLoader());
+        }
+
+        // invoke `open` method of ScalarOperation.
+        interpreter.invokeMethod("scalar_operation", "open");
+    }
+
+    @Override
+    public void createOperationInMultiThread() {
+        interpreter.exec(
+                "from pyflink.fn_execution.utils.operation_utils import create_scalar_operation_from_proto");
+        interpreter.set("proto", getUserDefinedFunctionsProto().toByteArray());
+
+        interpreter.exec(
+                String.format(
+                        "scalar_operation = create_scalar_operation_from_proto(proto, %s, %s)",
+                        isOneArg ? "True" : "False", isOneFieldResult ? "True" : "False"));
+    }
+
+    @Override
+    public void createOperationInMultiInterpreters(String pythonExecutable, Map<String, String> env)
+            throws IOException {
+        String[] commands =
+                new String[] {
+                    pythonExecutable,
+                    "-c",
+                    String.format(
+                            "from pyflink.fn_execution.utils.operation_utils import create_serialized_scalar_operation_from_proto;"
+                                    + "print(create_serialized_scalar_operation_from_proto(%s, %s, %s))",
+                            Arrays.toString(getUserDefinedFunctionsProto().toByteArray()),
+                            isOneArg ? "True" : "False",
+                            isOneFieldResult ? "True" : "False")
+                };
+        interpreter.exec(
+                "from pyflink.fn_execution.utils.operation_utils import deserialized_operation_from_serialized_bytes");
+        interpreter.exec(
+                String.format(
+                        "scalar_operation = deserialized_operation_from_serialized_bytes(%s)",
+                        executeScript(commands, env)));
+    }
+
+    @Override
+    public void endInput() {
+        if (interpreter != null) {
+            // invoke `close` method of ScalarOperation.
+            interpreter.invokeMethod("scalar_operation", "close");
+        }
+    }
+
+    @Override
+    public PythonEnv getPythonEnv() {
+        return scalarFunctions[0].getPythonFunction().getPythonEnv();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void processElement(StreamRecord<RowData> element) {
+        RowData value = element.getValue();
+
+        Object udfArgs = null;
+        if (userDefinedFunctionInputArgs.length > 1) {
+            for (int i = 0; i < userDefinedFunctionInputArgs.length; i++) {
+                userDefinedFunctionInputArgs[i] =
+                        userDefinedFunctionInputConverters[i].toExternal(value, udfInputOffsets[i]);
+            }
+            udfArgs = userDefinedFunctionInputArgs;
+        } else if (userDefinedFunctionInputArgs.length == 1) {
+            udfArgs = userDefinedFunctionInputConverters[0].toExternal(value, udfInputOffsets[0]);
+        }
+
+        if (isOneFieldResult) {
+            Object udfResult =
+                    interpreter.invokeMethod("scalar_operation", "process_element", udfArgs);
+            reuseResultRowData.setField(
+                    0, userDefinedFunctionOutputConverters[0].toInternal(udfResult));
+        } else {
+            Object[] udfResult =
+                    (Object[])
+                            interpreter.invokeMethod(
+                                    "scalar_operation", "process_element", udfArgs);
+            for (int i = 0; i < udfResult.length; i++) {
+                reuseResultRowData.setField(
+                        i, userDefinedFunctionOutputConverters[i].toInternal(udfResult[i]));
+            }
+        }
+
+        if (forwardedFieldProjection != null) {
+            BinaryRowData forwardedRowData = forwardedFieldProjection.apply(value).copy();
+            JoinedRowData reuseJoinedRow = new JoinedRowData(forwardedRowData, reuseResultRowData);
+            rowDataWrapper.collect(reuseJoinedRow);
+        } else {
+            rowDataWrapper.collect(reuseResultRowData);
+        }
+    }
+
+    @Override
+    protected void invokeFinishBundle() throws Exception {
+        // TODO: Support batches invoking.
+        if (elementCount > 0) {

Review comment:
       could be removed for now?

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.PythonConfig;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.pemja.EmbeddedPythonEnvironment;
+import org.apache.flink.python.env.pemja.EmbeddedPythonEnvironmentManager;
+import org.apache.flink.table.functions.python.PythonEnv;
+
+import pemja.core.PythonInterpreter;
+import pemja.core.PythonInterpreterConfig;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.python.env.AbstractPythonEnvironmentManager.PYTHON_WORKING_DIR;
+
+/**
+ * Abstract class for all stream operators to execute Python functions in embedded Python
+ * environment.
+ */
+@Internal
+public abstract class AbstractEmbeddedPythonFunctionOperator<OUT>
+        extends AbstractPythonFunctionOperator<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static ReentrantLock lock = new ReentrantLock();
+
+    private static Map<JobID, Tuple2<String, Integer>> workingDirectories = new HashMap<>();
+
+    /** The python config. */
+    protected transient PythonConfig pythonConfig;
+
+    /** Every operator will hold the only python interpreter. */
+    protected transient PythonInterpreter interpreter;
+
+    private transient EmbeddedPythonEnvironmentManager pythonEnvironmentManager;
+
+    public AbstractEmbeddedPythonFunctionOperator(Configuration config) {
+        super(config);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        pythonConfig = new PythonConfig(config);
+        pythonEnvironmentManager = createPythonEnvironmentManager();
+        pythonEnvironmentManager.open();
+        EmbeddedPythonEnvironment environment =
+                (EmbeddedPythonEnvironment) pythonEnvironmentManager.createEnvironment();
+
+        PythonInterpreterConfig interpreterConfig = environment.getConfig();
+        interpreter = new PythonInterpreter(interpreterConfig);
+        Map<String, String> env = environment.getEnv();
+
+        if (env.containsKey(PYTHON_WORKING_DIR)) {
+            lock.lockInterruptibly();
+
+            JobID jobId = getRuntimeContext().getJobId();
+            Tuple2<String, Integer> dirAndNums;
+
+            if (workingDirectories.containsKey(jobId)) {
+                dirAndNums = workingDirectories.get(jobId);
+            } else {
+                dirAndNums = Tuple2.of(null, 0);
+                workingDirectories.put(jobId, dirAndNums);
+            }
+            dirAndNums.f1 += 1;
+
+            if (dirAndNums.f0 == null) {
+                // get current directory.
+                interpreter.exec("import os;cwd = os.getcwd();");
+                dirAndNums.f0 = interpreter.get("cwd", String.class);
+                String workingDirectory = env.get(PYTHON_WORKING_DIR);
+                // set working directory
+                interpreter.exec(String.format("import os;os.chdir('%s')", workingDirectory));
+            }
+
+            lock.unlock();
+        }
+
+        if (interpreterConfig
+                .getExecType()
+                .equals(PythonInterpreterConfig.ExecType.SUB_INTERPRETER)) {
+            LOG.info("Create Operation in multi interpreters.");
+            createOperationInMultiInterpreters(pythonConfig.getPythonExec(), env);
+        } else {
+            LOG.info("Create Operation in multi threads.");
+            createOperationInMultiThread();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        try {
+            JobID jobId = getRuntimeContext().getJobId();
+            if (workingDirectories.containsKey(jobId)) {
+                lock.lockInterruptibly();
+                Tuple2<String, Integer> dirAndNums = workingDirectories.get(jobId);
+                dirAndNums.f1 -= 1;
+                if (dirAndNums.f1 == 0) {
+                    // change to previous working directory.
+                    interpreter.exec(String.format("import os;os.chdir('%s')", dirAndNums.f0));
+                }
+                lock.unlock();
+            }
+
+            if (interpreter != null) {
+                interpreter.close();
+            }
+
+            if (pythonEnvironmentManager != null) {
+                pythonEnvironmentManager.close();
+            }
+        } finally {
+            super.close();
+        }
+    }
+
+    @Override
+    protected EmbeddedPythonEnvironmentManager createPythonEnvironmentManager() {
+        PythonDependencyInfo dependencyInfo =
+                PythonDependencyInfo.create(
+                        pythonConfig, getRuntimeContext().getDistributedCache());
+        return new EmbeddedPythonEnvironmentManager(
+                dependencyInfo,
+                getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
+                new HashMap<>(System.getenv()),
+                getRuntimeContext().getJobId());
+    }
+
+    /** Creates the operation in multi-thread mode. */
+    public abstract void createOperationInMultiThread();

Review comment:
       What about rename it to openPythonInterpreter and merge createOperationInMultiThread and createOperationInMultiInterpreters into one method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org